Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ Keys supported by image output:
* `dangling-name-prefix=[value]`: name image with `prefix@<digest>` , used for anonymous images
* `name-canonical=true`: add additional canonical name `name@<digest>`
* `compression=[uncompressed,gzip,estargz,zstd]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
* `compression-level=[value]`: compression level for gzip, estargz (0-9) and zstd (0-22)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the default value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no universal default value. -1 is the default in gzip.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I see I made -1 match default inzstd as well(although non-spec). But estagz might still want a different default #2591 (comment)

* `force-compression=true`: forcefully apply `compression` option to all layers (including already existing layers).
* `buildinfo=[all,imageconfig,metadata,none]`: choose [build dependency](docs/build-repro.md#build-dependencies) version to export (default `all`).

Expand Down
81 changes: 59 additions & 22 deletions cache/blobs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"compress/gzip"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -34,7 +35,7 @@ var ErrNoBlobs = errors.Errorf("no blobs for snapshot")
// a blob is missing and createIfNeeded is true, then the blob will be created, otherwise ErrNoBlobs will
// be returned. Caller must hold a lease when calling this function.
// If forceCompression is specified but the blob of compressionType doesn't exist, this function creates it.
func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) error {
func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded bool, comp compression.Config, s session.Group) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for computeBlobChain")
}
Expand All @@ -52,31 +53,31 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo
// refs rather than every single layer present among their ancestors.
filter := sr.layerSet()

return computeBlobChain(ctx, sr, createIfNeeded, compressionType, forceCompression, s, filter)
return computeBlobChain(ctx, sr, createIfNeeded, comp, s, filter)
}

type compressor func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error)

func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group, filter map[string]struct{}) error {
func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, comp compression.Config, s session.Group, filter map[string]struct{}) error {
eg, ctx := errgroup.WithContext(ctx)
switch sr.kind() {
case Merge:
for _, parent := range sr.mergeParents {
parent := parent
eg.Go(func() error {
return computeBlobChain(ctx, parent, createIfNeeded, compressionType, forceCompression, s, filter)
return computeBlobChain(ctx, parent, createIfNeeded, comp, s, filter)
})
}
case Diff:
if _, ok := filter[sr.ID()]; !ok && sr.diffParents.upper != nil {
// This diff is just re-using the upper blob, compute that
eg.Go(func() error {
return computeBlobChain(ctx, sr.diffParents.upper, createIfNeeded, compressionType, forceCompression, s, filter)
return computeBlobChain(ctx, sr.diffParents.upper, createIfNeeded, comp, s, filter)
})
}
case Layer:
eg.Go(func() error {
return computeBlobChain(ctx, sr.layerParent, createIfNeeded, compressionType, forceCompression, s, filter)
return computeBlobChain(ctx, sr.layerParent, createIfNeeded, comp, s, filter)
})
}

Expand All @@ -93,19 +94,24 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
var mediaType string
var compressorFunc compressor
var finalize func(context.Context, content.Store) (map[string]string, error)
switch compressionType {
switch comp.Type {
case compression.Uncompressed:
mediaType = ocispecs.MediaTypeImageLayer
case compression.Gzip:
compressorFunc = func(dest io.Writer, _ string) (io.WriteCloser, error) {
return gzipWriter(comp)(dest)
}
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.EStargz:
compressorFunc, finalize = compressEStargz()
compressorFunc, finalize = compressEStargz(comp)
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.Zstd:
compressorFunc = zstdWriter
compressorFunc = func(dest io.Writer, _ string) (io.WriteCloser, error) {
return zstdWriter(comp)(dest)
}
mediaType = ocispecs.MediaTypeImageLayer + "+zstd"
default:
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
return nil, errors.Errorf("unknown layer compression type: %q", comp.Type)
}

var lowerRef *immutableRef
Expand Down Expand Up @@ -235,7 +241,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.Errorf("unknown layer compression type")
}

if err := sr.setBlob(ctx, compressionType, desc); err != nil {
if err := sr.setBlob(ctx, comp.Type, desc); err != nil {
return nil, err
}
return nil, nil
Expand All @@ -244,9 +250,9 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return err
}

if forceCompression {
if err := ensureCompression(ctx, sr, compressionType, s); err != nil {
return errors.Wrapf(err, "failed to ensure compression type of %q", compressionType)
if comp.Force {
if err := ensureCompression(ctx, sr, comp, s); err != nil {
return errors.Wrapf(err, "failed to ensure compression type of %q", comp.Type)
}
}
return nil
Expand Down Expand Up @@ -412,15 +418,15 @@ func isTypeWindows(sr *immutableRef) bool {
}

// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, compressionType compression.Type, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", ref.ID(), compressionType), func(ctx context.Context) (interface{}, error) {
func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) {
desc, err := ref.ociDesc(ctx, ref.descHandlers)
if err != nil {
return nil, err
}

// Resolve converters
layerConvertFunc, err := getConverter(ctx, ref.cm.ContentStore, desc, compressionType)
layerConvertFunc, err := getConverter(ctx, ref.cm.ContentStore, desc, comp)
if err != nil {
return nil, err
} else if layerConvertFunc == nil {
Expand All @@ -430,11 +436,11 @@ func ensureCompression(ctx context.Context, ref *immutableRef, compressionType c
// This ref can be used as the specified compressionType. Keep it lazy.
return nil, nil
}
return nil, ref.addCompressionBlob(ctx, desc, compressionType)
return nil, ref.addCompressionBlob(ctx, desc, comp.Type)
}

// First, lookup local content store
if _, err := ref.getCompressionBlob(ctx, compressionType); err == nil {
if _, err := ref.getCompressionBlob(ctx, comp.Type); err == nil {
return nil, nil // found the compression variant. no need to convert.
}

Expand All @@ -453,14 +459,45 @@ func ensureCompression(ctx context.Context, ref *immutableRef, compressionType c
}

// Start to track converted layer
if err := ref.addCompressionBlob(ctx, *newDesc, compressionType); err != nil {
if err := ref.addCompressionBlob(ctx, *newDesc, comp.Type); err != nil {
return nil, errors.Wrapf(err, "failed to add compression blob")
}
return nil, nil
})
return err
}

func zstdWriter(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
return zstd.NewWriter(dest)
func gzipWriter(comp compression.Config) func(io.Writer) (io.WriteCloser, error) {
return func(dest io.Writer) (io.WriteCloser, error) {
level := gzip.DefaultCompression
if comp.Level != nil {
level = *comp.Level
}
return gzip.NewWriterLevel(dest, level)
}
}

func zstdWriter(comp compression.Config) func(io.Writer) (io.WriteCloser, error) {
return func(dest io.Writer) (io.WriteCloser, error) {
level := zstd.SpeedDefault
if comp.Level != nil {
level = toZstdEncoderLevel(*comp.Level)
}
return zstd.NewWriter(dest, zstd.WithEncoderLevel(level))
}
}

func toZstdEncoderLevel(level int) zstd.EncoderLevel {
// map zstd compression levels to go-zstd levels
// once we also have c based implementation move this to helper pkg
if level < 0 {
return zstd.SpeedDefault
} else if level < 3 {
return zstd.SpeedFastest
} else if level < 7 {
return zstd.SpeedDefault
} else if level < 9 {
return zstd.SpeedBetterCompression
}
return zstd.SpeedBestCompression
}
15 changes: 0 additions & 15 deletions cache/blobs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"io"

ctdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
Expand All @@ -33,20 +32,6 @@ func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper
return emptyDesc, false, nil
}

if compressorFunc == nil {
switch mediaType {
case ocispecs.MediaTypeImageLayer:
case ocispecs.MediaTypeImageLayerGzip:
compressorFunc = func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
return ctdcompression.CompressStream(dest, ctdcompression.Gzip)
}
case ocispecs.MediaTypeImageLayer + "+zstd":
compressorFunc = zstdWriter
default:
return emptyDesc, false, errors.Errorf("unsupported diff media type: %v", mediaType)
}
}

cw, err := sr.cm.ContentStore.Writer(ctx,
content.WithRef(ref),
content.WithDescriptor(ocispecs.Descriptor{
Expand Down
28 changes: 11 additions & 17 deletions cache/converter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cache

import (
"compress/gzip"
"context"
"fmt"
"io"
Expand All @@ -12,7 +11,6 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/labels"
"github.com/klauspost/compress/zstd"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -57,15 +55,15 @@ func needsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descri

// getConverter returns converter function according to the specified compression type.
// If no conversion is needed, this returns nil without error.
func getConverter(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, compressionType compression.Type) (converter.ConvertFunc, error) {
if needs, err := needsConversion(ctx, cs, desc, compressionType); err != nil {
func getConverter(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, comp compression.Config) (converter.ConvertFunc, error) {
if needs, err := needsConversion(ctx, cs, desc, comp.Type); err != nil {
return nil, errors.Wrapf(err, "failed to determine conversion needs")
} else if !needs {
// No conversion. No need to return an error here.
return nil, nil
}

c := conversion{target: compressionType}
c := conversion{target: comp}

from := compression.FromMediaType(desc.MediaType)
switch from {
Expand Down Expand Up @@ -96,31 +94,27 @@ func getConverter(ctx context.Context, cs content.Store, desc ocispecs.Descripto
return nil, errors.Errorf("unsupported source compression type %q from mediatype %q", from, desc.MediaType)
}

switch compressionType {
switch comp.Type {
case compression.Uncompressed:
case compression.Gzip:
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(w), nil
}
c.compress = gzipWriter(comp)
case compression.Zstd:
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return zstd.NewWriter(w)
}
c.compress = zstdWriter(comp)
case compression.EStargz:
compressorFunc, finalize := compressEStargz()
compressorFunc, finalize := compressEStargz(comp)
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip)
}
c.finalize = finalize
default:
return nil, errors.Errorf("unknown target compression type during conversion: %q", compressionType)
return nil, errors.Errorf("unknown target compression type during conversion: %q", comp.Type)
}

return (&c).convert, nil
}

type conversion struct {
target compression.Type
target compression.Config
decompress func(context.Context, ocispecs.Descriptor) (io.ReadCloser, error)
compress func(w io.Writer) (io.WriteCloser, error)
finalize func(context.Context, content.Store) (map[string]string, error)
Expand All @@ -129,7 +123,7 @@ type conversion struct {
func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
// prepare the source and destination
labelz := make(map[string]string)
ref := fmt.Sprintf("convert-from-%s-to-%s-%s", desc.Digest, c.target.String(), identity.NewID())
ref := fmt.Sprintf("convert-from-%s-to-%s-%s", desc.Digest, c.target.Type.String(), identity.NewID())
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
Expand Down Expand Up @@ -188,7 +182,7 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec
}

newDesc := desc
newDesc.MediaType = c.target.DefaultMediaType()
newDesc.MediaType = c.target.Type.DefaultMediaType()
newDesc.Digest = info.Digest
newDesc.Size = info.Size
newDesc.Annotations = map[string]string{labels.LabelUncompressed: diffID.Digest().String()}
Expand Down
9 changes: 7 additions & 2 deletions cache/estargz.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"archive/tar"
"compress/gzip"
"context"
"fmt"
"io"
Expand All @@ -21,7 +22,7 @@ var eStargzAnnotations = []string{estargz.TOCJSONDigestAnnotation, estargz.Store

// compressEStargz writes the passed blobs stream as an eStargz-compressed blob.
// finalize function finalizes the written blob metadata and returns all eStargz annotations.
func compressEStargz() (compressorFunc compressor, finalize func(context.Context, content.Store) (map[string]string, error)) {
func compressEStargz(comp compression.Config) (compressorFunc compressor, finalize func(context.Context, content.Store) (map[string]string, error)) {
var cInfo *compressionInfo
var writeErr error
var mu sync.Mutex
Expand All @@ -43,7 +44,11 @@ func compressEStargz() (compressorFunc compressor, finalize func(context.Context

blobInfoW, bInfoCh := calculateBlobInfo()
defer blobInfoW.Close()
w := estargz.NewWriter(io.MultiWriter(dest, blobInfoW))
level := gzip.BestCompression
if comp.Level != nil {
level = *comp.Level
}
w := estargz.NewWriterLevel(io.MultiWriter(dest, blobInfoW), level)

// Using lossless API here to make sure that decompressEStargz provides the exact
// same tar as the original.
Expand Down
Loading